-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add single worker execution mode for native execution #24172
base: master
Are you sure you want to change the base?
Conversation
dfa9ee8
to
0d9137e
Compare
0d9137e
to
d71d256
Compare
Can you help to explain why this feature is or has to be tied to native execution? Perhaps describe the background and motivation? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
High level comments
- Do we want to support single node execution on per-query basis?
This can be useful to improve latency of tiny queries running on a large cluster. For example a user may know that a query is small and may decide to run it on a multi node cluster in a single node mode.
If decided to support it is necessary to make sure the session property is used consistently through the code.
If decided not to support it for now I think the session property should be removed and a configuration property should only be used.
- Should the single node execution mode be native specific?
When running a Java cluster deployment with a dedicated coordinator and a dedicated worker (workers) additional exchanges at worker - coordinator boundary are necessary.
I'm thinking if a simpler mental model would be to always add coordinator-to-worker exchanges when single node execution is requested?
@@ -328,6 +328,7 @@ public final class SystemSessionProperties | |||
// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future. | |||
public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all"; | |||
private static final String NATIVE_EXECUTION_ENABLED = "native_execution_enabled"; | |||
private static final String NATIVE_SINGLE_WORKER_EXECUTION = "native_single_worker_execution"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should we stay consistent and use node
instead of worker
(e.g.: query_max_memory_per_node
, force_single_node_output
, etc.). Also maybe add the _enabled
suffix to make it sound more natural, e.g.: single_node_execution_enabled
, isSingleNodeExecutionEnabled(...)
@@ -328,6 +328,7 @@ public final class SystemSessionProperties | |||
// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future. | |||
public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all"; | |||
private static final String NATIVE_EXECUTION_ENABLED = "native_execution_enabled"; | |||
private static final String NATIVE_SINGLE_WORKER_EXECUTION = "native_single_worker_execution"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a good reason why we want to make it native specific? It feels like this property does not necessarily have to be native specific. However when native execution is enabled the planner may need to add additional exchanges to make it work on native.
@@ -233,6 +233,7 @@ public class FeaturesConfig | |||
private boolean pushRemoteExchangeThroughGroupId; | |||
private boolean isOptimizeMultipleApproxPercentileOnSameFieldEnabled = true; | |||
private boolean nativeExecutionEnabled; | |||
private boolean nativeSingleWorkerExecution; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto: same comments as for the session property
@@ -94,7 +96,7 @@ public static SubPlan finalizeSubPlan( | |||
PartitioningHandle partitioningHandle) | |||
{ | |||
subPlan = reassignPartitioningHandleIfNecessary(metadata, session, subPlan, partitioningHandle); | |||
if (!forceSingleNode) { | |||
if (!forceSingleNode && !(isNativeExecutionEnabled(session) && isNativeSingleWorkerExecution(session))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe set the existing forceSingleNode
accordingly instead
} | ||
|
||
@Override | ||
public PlanNode visitTableFinish(TableFinishNode node, RewriteContext<Void> context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are other nodes that need to be run on coordinator:
https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java#L181
https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java#L235
Do we need to add an exchange to support those as well?
@@ -813,7 +814,11 @@ public PlanOptimizers( | |||
costCalculator, | |||
ImmutableSet.of(new ScaledWriterRule()))); | |||
|
|||
if (!forceSingleNode) { | |||
if (featuresConfig.isNativeExecutionEnabled() && featuresConfig.isNativeSingleWorkerExecution()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if a simpler mental model would be to always add worker to coordinator
exchanges when single worker execution is enabled.
This way:
- single worker execution can be enabled for normal clusters with more than a single worker and a single coordinator (with schedule on coordinator disabled) on per query basis
- For Java execution if coordinator scheduling is enabled an extra exchange is not going to hurt
For example this condition can be kept as isSingleNodeExecutionEnabled(session)
and we can call the AddExchangeForNativeSingleWorker
as AddWorkerToCoordinatorExchanges
@@ -813,7 +814,11 @@ public PlanOptimizers( | |||
costCalculator, | |||
ImmutableSet.of(new ScaledWriterRule()))); | |||
|
|||
if (!forceSingleNode) { | |||
if (featuresConfig.isNativeExecutionEnabled() && featuresConfig.isNativeSingleWorkerExecution()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to be consistent on whether session property is supported or not. Otherwise someone may try to enable session property without realizing it not being taking into account everywhere, and end up with a broken plan.
@@ -55,7 +57,7 @@ public void setEnabledForTesting(boolean isSet) | |||
@Override | |||
public boolean isEnabled(Session session) | |||
{ | |||
return isEnabledForTesting || isGroupedExecutionEnabled(session) && preferMergeJoinForSortedInputs(session); | |||
return isEnabledForTesting || isGroupedExecutionEnabled(session) && preferMergeJoinForSortedInputs(session) && !(isNativeExecutionEnabled(session) && isNativeSingleWorkerExecution(session)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe let's throw an explicit exception saying that MergeJoin
is not supported with single node execution (if requested)
|
||
import java.util.Optional; | ||
|
||
public class TestPrestoNativeWindowQueriesSingleWorker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably want to test EXPLAIN ANALYZE
, metadata delete and select from system tables (as it has a special case for native execution)
} | ||
|
||
@Override | ||
public void testScaleWriters() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scale writers should have no effect in single node? Do we nee this test?
We can also potentially use HBO/CBO to decide to run some in single node mode |
Description